/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.fone.flumeExt.sink.L2HSink;

//import java.io.BufferedOutputStream;
//import java.io.FileOutputStream;
import java.io.File;
import java.io.IOException;
//import java.io.OutputStream;
import java.util.Calendar;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.TimeZone;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Channel;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

// TODO: Auto-generated Javadoc
/**
 * <p>Title: LocalToHDFSSink.java</p>
 * The Class LocalToHDFSSink.
 * <p>Description: </p>
 *
 * @author Phoenics Chow
 * @version 1.0
 * @date 2014-6-3
 */
public class LocalToHDFSSink extends AbstractSink implements Configurable {
	/*
	 * public interface WriterCallback { public void run(String filePath); }
	 */

	/** The Constant LOG. */
	private static final Logger LOG = LoggerFactory.getLogger(LocalToHDFSSink.class);

	/** The directory delimiter. */
	private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");

	/** The Constant defaultRollInterval. */
	private static final long defaultRollInterval = 30;
	
	/** The Constant defaultRollSize. */
	private static final long defaultRollSize = 1024;
	
	/** The Constant defaultRollCount. */
	private static final long defaultRollCount = 10;
	
	/** The Constant defaultFileName. */
	private static final String defaultFileName = "FlumeData";
	
	/** The Constant defaultSuffix. */
	private static final String defaultSuffix = "";
	
	/** The Constant defaultInUsePrefix. */
	private static final String defaultInUsePrefix = "";
	
	/** The Constant defaultInUseSuffix. */
	private static final String defaultInUseSuffix = ".tmp";
	
	/** The Constant defaultBatchSize. */
	private static final long defaultBatchSize = 100;
	
	/** The Constant defaultCronExpression. */
	private static final String defaultCronExpression = "0 0/10  * * * ?";
	
	/** The Constant defaultMaxOpenFiles. */
	private static final int defaultMaxOpenFiles = 5000;

	/**
	 * Default length of time we wait for blocking BucketWriter calls before
	 * timing out the operation. Intended to prevent server hangs.
	 */
	private static final long defaultCallTimeout = 10000;
	/**
	 * Default number of threads available for tasks such as
	 * append/open/close/flush with hdfs. These tasks are done in a separate
	 * thread in the case that they take too long. In which case we create a new
	 * file and move on.
	 */
	private static final int defaultThreadPoolSize = 10;
	
	/** The Constant defaultRollTimerPoolSize. */
	private static final int defaultRollTimerPoolSize = 1;

	/** The sf writers. */
	private LinkedHashMap<String, LocalFileWrite> sfWriters;

	/** The roll interval. */
	private long rollInterval;
	
	/** The roll size. */
	private long rollSize;
	
	/** The roll count. */
	private long rollCount;
	
	/** The batch size. */
	private long batchSize;
	
	/** The roll timer pool size. */
	private int rollTimerPoolSize;
	
	/** The file path. */
	private String filePath;
	
	/** The hdfs path. */
	private String hdfsPath;
	
	/** The middle dir. */
	private String middleDir;
	
	/** The cron expression. */
	private String cronExpression;
	
	/** The file name. */
	private String fileName;
	
	/** The suffix. */
	private String suffix;
	
	/** The in use prefix. */
	private String inUsePrefix;
	
	/** The in use suffix. */
	private String inUseSuffix;
	
	/** The time zone. */
	private TimeZone timeZone;
	
	/** The call timeout pool. */
	private ExecutorService callTimeoutPool;
	
	/** The timed roller pool. */
	private ScheduledExecutorService timedRollerPool;
	
	/** The max open files. */
	private int maxOpenFiles;
	
	/** The threads pool size. */
	private int threadsPoolSize;
	
	/** The need rounding. */
	private boolean needRounding = false;
	
	/** The round unit. */
	private int roundUnit = Calendar.SECOND;
	
	/** The round value. */
	private int roundValue = 1;
	
	/** The use local time. */
	private boolean useLocalTime = false;
	
	/** The is keep. */
	private boolean isKeep = false;

	/** The call timeout. */
	private long callTimeout;
	
	/** The context. */
	private Context context;
	
	/** The sink counter. */
	private SinkCounter sinkCounter;

	/** The idle timeout. */
	private volatile int idleTimeout;

	/** The cron trigger file hdfs. */
	private CronTriggerFileHDFS cronTriggerFileHDFS = null;
	
	/** The clock. */
	@SuppressWarnings("unused")
	private Clock clock;

	/**
	 * <p>Title: LocalToHDFSSink.java</p>
	 * The Interface WriterCallback.
	 * <p>Description: </p>
	 *
	 * @author Phoenics Chow
	 * @version 1.0
	 * @date 2014-6-3
	 */
	public interface WriterCallback {
		
		/**
		 * Run.
		 *
		 * @param filePath the file path
		 */
		public void run(String filePath);
	}

	/**
	 * Instantiates a new local to hdfs sink.
	 */
	public LocalToHDFSSink() {
	}

	// read configuration and setup thresholds
	/* (non-Javadoc)
	 * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
	 */
	@Override
	public void configure(Context context) {
		this.context = context;
		// 写入hdfs需要的变量 ：1，hdfs 目录。2，本地目录。 3，定时器crontab 字符串。
		// filePath hdfsPath cronExpression isKeep
		hdfsPath = Preconditions.checkNotNull(context.getString("hdfs.directory"), "hdfs.directory is required");
		filePath = Preconditions.checkNotNull(context.getString("local.directory"), "local.directory is required");
		middleDir = context.getString("local.middleDir", "");
		cronExpression = context.getString("local.cronExpression", defaultCronExpression);
		fileName = context.getString("local.filePrefix", defaultFileName);
		this.suffix = context.getString("local.fileSuffix", defaultSuffix);
		inUsePrefix = context.getString("local.inUsePrefix", defaultInUsePrefix);
		inUseSuffix = context.getString("local.inUseSuffix", defaultInUseSuffix);
		String tzName = context.getString("local.timeZone");
		timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
		rollInterval = context.getLong("local.rollInterval", defaultRollInterval);
		rollSize = context.getLong("local.rollSize", defaultRollSize);
		rollCount = context.getLong("local.rollCount", defaultRollCount);
		batchSize = context.getLong("local.batchSize", defaultBatchSize);
		idleTimeout = context.getInteger("local.idleTimeout", 0);
		maxOpenFiles = context.getInteger("local.maxOpenFiles", defaultMaxOpenFiles);
		threadsPoolSize = context.getInteger("local.threadsPoolSize", defaultThreadPoolSize);

		callTimeout = context.getLong("local.callTimeout", defaultCallTimeout);
		rollTimerPoolSize = context.getInteger("local.rollTimerPoolSize", defaultRollTimerPoolSize);
		Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
		needRounding = context.getBoolean("local.round", false);
		isKeep = context.getBoolean("local.isKeep", false);
		if (needRounding) {
			String unit = context.getString("local.roundUnit", "second");
			if (unit.equalsIgnoreCase("hour")) {
				this.roundUnit = Calendar.HOUR_OF_DAY;
			} else if (unit.equalsIgnoreCase("minute")) {
				this.roundUnit = Calendar.MINUTE;
			} else if (unit.equalsIgnoreCase("second")) {
				this.roundUnit = Calendar.SECOND;
			} else {
				LOG.warn("Rounding unit is not valid, please set one of minute, hour, or second. Rounding will be disabled");
				needRounding = false;
			}
			this.roundValue = context.getInteger("local.roundValue", 1);
			if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
				Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, "Round value" + "must be > 0 and <= 60");
			} else if (roundUnit == Calendar.HOUR_OF_DAY) {
				Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, "Round value" + "must be > 0 and <= 24");
			}
		}

		this.useLocalTime = context.getBoolean("local.useLocalTimeStamp", false);
		if (useLocalTime) {
			clock = new SystemClock();
		}
		if (sinkCounter == null) {
			sinkCounter = new SinkCounter(getName());
		}
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.Sink#process()
	 */
	@Override
	public Status process() throws EventDeliveryException {
		Channel channel = getChannel();
		Transaction transaction = channel.getTransaction();
		List<LocalFileWrite> writers = Lists.newArrayList();
		List<String> lookupPathes = Lists.newArrayList();
		transaction.begin();
		try {
			int txnEventCount = 0;
			for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
				Event event = channel.take();
				if (event == null) {
					break;
				}
				// reconstruct the path name by substituting place holders
				String realPath = BucketPath.escapeString(middleDir, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime);
				realPath = filePath + DIRECTORY_DELIMITER + realPath;
				String realName = BucketPath.escapeString(fileName, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime);
				String lookupPath = realPath + DIRECTORY_DELIMITER + realName;

				LocalFileWrite bucketWriter = sfWriters.get(lookupPath);
				// we haven't seen this file yet, so open it and cache the
				// handle
				if (bucketWriter == null) {
					LOG.info("开始新的OutputStream，写输出文件设置{}", lookupPath);
					@SuppressWarnings("unused")
					WriterCallback idleCallback = null;
					if (idleTimeout != 0) {
						LOG.info("idleCallback remove bucketPath");
						idleCallback = new WriterCallback() {
							@Override
							public void run(String bucketPath) {
								sfWriters.remove(bucketPath);
							}
						};
					}
					// long rollInterval, long rollSize, long rollCount, long
					// batchSize, Context context, String filePath, String
					// fileName,
					// String inUsePrefix, String inUseSuffix, String
					// fileSuffix, ScheduledExecutorService timedRollerPool
					File file = new File(realPath);
					if (!file.exists()) {
						file.mkdirs();
					}
					bucketWriter = new LocalFileWrite(rollInterval, rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
							suffix, timedRollerPool);
					sfWriters.put(lookupPath, bucketWriter);
				}

				// track the buckets getting written in this transaction
				if (!writers.contains(bucketWriter)) {
					writers.add(bucketWriter);
				}
				if (!lookupPathes.contains(lookupPath)) {
					lookupPathes.add(lookupPath);
				}
				bucketWriter.append(event);
			}

			if (txnEventCount == 0) {
				sinkCounter.incrementBatchEmptyCount();
			} else if (txnEventCount == batchSize) {
				sinkCounter.incrementBatchCompleteCount();
			} else {
				sinkCounter.incrementBatchUnderflowCount();
			}

			// flush all pending buckets before committing the transaction
			for (LocalFileWrite bucketWriter : writers) {
				bucketWriter.flush();
			}
			transaction.commit();
			if (txnEventCount < 1) {
				return Status.BACKOFF;
			} else {
				sinkCounter.addToEventDrainSuccessCount(txnEventCount);
				return Status.READY;
			}
		} catch (IOException eIO) {
			transaction.rollback();
			LOG.warn("Local IO error", eIO);
			for (String p : lookupPathes) {
				LOG.warn("删除这个地址({})指向的bucketWriter对象.", p);
				sfWriters.remove(p);
			}
			lookupPathes.clear();
			throw new EventDeliveryException();
			// return Status.BACKOFF;

		} catch (Throwable th) {
			transaction.rollback();
			LOG.error("process failed", th);
			if (th instanceof Error) {
				throw (Error) th;
			} else {
				throw new EventDeliveryException(th);
			}
		} finally {
			transaction.close();
		}
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.sink.AbstractSink#stop()
	 */
	@Override
	public void stop() {
		// do not constrain close() calls with a timeout
		for (Entry<String, LocalFileWrite> entry : sfWriters.entrySet()) {
			LOG.info("Closing {}", entry.getKey());

			try {
				entry.getValue().close();
			} catch (Exception ex) {
				LOG.warn("Exception while closing " + entry.getKey() + ". " + "Exception follows.", ex);
				if (ex instanceof InterruptedException) {
					Thread.currentThread().interrupt();
				}
			}
		}

		// shut down all our thread pools
		ExecutorService toShutdown[] = { callTimeoutPool, timedRollerPool };
		for (ExecutorService execService : toShutdown) {
			execService.shutdown();
			try {
				while (execService.isTerminated() == false) {
					execService.awaitTermination(Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
				}
			} catch (InterruptedException ex) {
				LOG.warn("shutdown interrupted on " + execService, ex);
			}
		}
		callTimeoutPool = null;
		timedRollerPool = null;
		sfWriters.clear();
		sfWriters = null;
		sinkCounter.stop();
		if (cronTriggerFileHDFS != null) {
			try {
				cronTriggerFileHDFS.shutdown();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		super.stop();
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.sink.AbstractSink#start()
	 */
	@Override
	public void start() {
		String timeoutName = "local2hdfs-" + getName() + "-call-runner-%d";
		callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
		String rollerName = "local2hdfs-" + getName() + "-roll-timer-%d";
		timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize, new ThreadFactoryBuilder().setNameFormat(rollerName).build());
		this.sfWriters = new LinkedHashMap<>(maxOpenFiles);
		findTmpFile(filePath);
		CronTriggerFileHDFS cronTriggerFileHDFS = new CronTriggerFileHDFS();
		LOG.info("定时器设置,cron expression :{} .", cronExpression);
		try {
			cronTriggerFileHDFS.run(filePath, hdfsPath, cronExpression, isKeep);
		} catch (Exception e) {
			LOG.warn("向hdfs写文件的定时器错误，错误：{}.", e);
		}
		sinkCounter.start();
		super.start();
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.sink.AbstractSink#toString()
	 */
	@Override
	public String toString() {
		return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + " }";
	}

	/**
	 * Find tmp file.
	 *
	 * @param basePath the base path
	 */
	private void findTmpFile(String basePath) {
		File baseFile = new File(basePath);
		File[] fd = baseFile.listFiles();
		for (File d : fd) {
			if (d.isFile()) {
				String filefullName = d.getPath();
				if (filefullName.trim().endsWith(".tmp")) {
					filefullName = filefullName.substring(0, filefullName.length() - 4);
					File fullName = new File(filefullName);
					if (d.renameTo(fullName)) {
						LOG.info("file {}  rename to {}.", d.getPath(), filefullName);
					} else {
						d.delete();
					}
				}
			}
			if (d.isDirectory()) {
				findTmpFile(d.getPath());
			}
		}
	}
}